{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "2ac8b4d4",
   "metadata": {},
   "source": [
    "\n",
    "# Workflow Setup with Caching in Agno\n",
    "\n",
    "This example demonstrates how to create efficient, stateful workflows in Agno that orchestrate complex agent interactions while maintaining performance through caching and state management.\n",
    "\n",
    "## Overview\n",
    "This example shows how to build reusable agent workflows where we:\n",
    "\n",
    "1. **Design workflow architecture** with custom logic and agent orchestration\n",
    "2. **Implement caching mechanisms** to store and reuse expensive computations\n",
    "3. **Manage session state** to maintain context across multiple interactions\n",
    "4. **Set up response streaming** for real-time output handling\n",
    "\n",
    "By using workflows, you can create sophisticated agent pipelines that are both performant and maintainable, with built-in optimizations for repeated operations and long-running sessions.\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "40bf672a",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Install the required dependencies:\n",
    "%pip install agentops\n",
    "%pip install agno\n",
    "%pip install python-dotenv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ee482533",
   "metadata": {
    "lines_to_next_cell": 2
   },
   "outputs": [],
   "source": [
    "import os\n",
    "from dotenv import load_dotenv\n",
    "\n",
    "import agentops\n",
    "from agno.agent import Agent, RunResponse\n",
    "from agno.workflow import Workflow\n",
    "from agno.utils.pprint import pprint_run_response\n",
    "from agno.models.openai import OpenAIChat\n",
    "from agno.utils.log import logger\n",
    "from typing import Iterator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "eddbb872",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Load environment variables\n",
    "load_dotenv()\n",
    "os.environ[\"OPENAI_API_KEY\"] = os.getenv(\"OPENAI_API_KEY\", \"your_openai_api_key_here\")\n",
    "os.environ[\"AGENTOPS_API_KEY\"] = os.getenv(\"AGENTOPS_API_KEY\", \"your_agentops_api_key_here\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6f234791",
   "metadata": {},
   "outputs": [],
   "source": [
    "agentops.init(auto_start_session=False, tags=[\"agno-example\", \"workflow-setup\"])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "ff36679e",
   "metadata": {},
   "outputs": [],
   "source": [
    "class CacheWorkflow(Workflow):\n",
    "    \"\"\"\n",
    "    A workflow that demonstrates intelligent caching capabilities.\n",
    "\n",
    "    This workflow:\n",
    "    - Caches agent responses to avoid redundant API calls\n",
    "    - Maintains session state across multiple invocations\n",
    "    - Provides instant responses for repeated queries\n",
    "    - Reduces costs and improves performance\n",
    "\n",
    "    Use cases:\n",
    "    - FAQ systems where questions repeat frequently\n",
    "    - Development/testing to avoid repeated API calls\n",
    "    - Systems with predictable query patterns\n",
    "    \"\"\"\n",
    "\n",
    "    # Workflow metadata (descriptive, not functional)\n",
    "    description: str = \"A workflow that caches previous outputs for efficiency\"\n",
    "\n",
    "    # Initialize agents as workflow attributes\n",
    "    # This agent will be used to generate responses when cache misses occur\n",
    "    agent = Agent(model=OpenAIChat(id=\"gpt-4o-mini\"), description=\"General purpose agent for generating responses\")\n",
    "\n",
    "    def run(self, message: str) -> Iterator[RunResponse]:\n",
    "        \"\"\"\n",
    "        Execute the workflow with caching logic.\n",
    "\n",
    "        This method:\n",
    "        1. Checks if the response is already cached\n",
    "        2. Returns cached response immediately if found\n",
    "        3. Generates new response if not cached\n",
    "        4. Caches the new response for future use\n",
    "\n",
    "        Args:\n",
    "            message: The input query to process\n",
    "\n",
    "        Yields:\n",
    "            RunResponse: Streamed response chunks\n",
    "        \"\"\"\n",
    "        logger.info(f\"Checking cache for '{message}'\")\n",
    "\n",
    "        if self.session_state.get(message):\n",
    "            logger.info(f\"Cache hit for '{message}'\")\n",
    "            # Return cached response immediately (no API call needed)\n",
    "            yield RunResponse(run_id=self.run_id, content=self.session_state.get(message))\n",
    "            return\n",
    "\n",
    "        logger.info(f\"Cache miss for '{message}'\")\n",
    "\n",
    "        yield from self.agent.run(message, stream=True)\n",
    "\n",
    "        self.session_state[message] = self.agent.run_response.content\n",
    "        logger.info(\"Cached response for future use\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "b1e9e06c",
   "metadata": {},
   "outputs": [],
   "source": [
    "def demonstrate_workflows():\n",
    "    \"\"\"\n",
    "    Demonstrate workflow capabilities with caching.\n",
    "\n",
    "    This function shows:\n",
    "    - How to create and use custom workflows\n",
    "    - The performance benefits of caching\n",
    "    - Session state persistence\n",
    "    - Response streaming\n",
    "    \"\"\"\n",
    "\n",
    "    tracer = agentops.start_trace(trace_name=\"Agno Workflow Setup Demonstration\")\n",
    "    try:\n",
    "        workflow = CacheWorkflow()\n",
    "\n",
    "        response: Iterator[RunResponse] = workflow.run(message=\"Tell me a joke.\")\n",
    "\n",
    "        pprint_run_response(response, markdown=True, show_time=True)\n",
    "\n",
    "        response: Iterator[RunResponse] = workflow.run(message=\"Tell me a joke.\")\n",
    "\n",
    "        pprint_run_response(response, markdown=True, show_time=True)\n",
    "\n",
    "        agentops.end_trace(tracer, end_state=\"Success\")\n",
    "\n",
    "    except Exception:\n",
    "        agentops.end_trace(tracer, end_state=\"Error\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d09d8f70",
   "metadata": {},
   "outputs": [],
   "source": [
    "demonstrate_workflows()"
   ]
  }
 ],
 "metadata": {
  "jupytext": {
   "cell_metadata_filter": "-all",
   "main_language": "python",
   "notebook_metadata_filter": "-all"
  },
  "kernelspec": {
   "display_name": "venv",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
